{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Interactive Phi 3 Mini 4K Instruct Chatbot with Whisper\n",
        "\n",
        "### Introduction:\n",
        "The Interactive Phi 3 Mini 4K Instruct Chatbot is a tool that allows users to interact with the Microsoft Phi 3 Mini 4K instruct demo using text or audio input. The chatbot can be used for a variety of tasks, such as translation, weather updates, and general information gathering."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "Atl_WEmtR0Yd"
      },
      "outputs": [],
      "source": [
        "#Install required Python Packages\n",
        "!pip install accelerate\n",
        "!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118\n",
        "!pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': \"TRUE\"}, shell=True\n",
        "!pip install transformers\n",
        "!pip install wheel\n",
        "!pip install gradio\n",
        "!pip install pydub==0.25.1\n",
        "!pip install edge-tts\n",
        "!pip install openai-whisper==20231117\n",
        "!pip install ffmpeg==1.4\n",
        "# from IPython.display import clear_output\n",
        "# clear_output()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Checking to see if Cuda support is available \n",
        "# Output True = Cuda\n",
        "# Output False = No Cuda (installing Cuda will be required to run the model on GPU)\n",
        "import os \n",
        "import torch\n",
        "print(torch.cuda.is_available())\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MKAUp20H4ZXl"
      },
      "source": [
        "[Create your Huggingface Access Token](https://huggingface.co/settings/tokens)\n",
        "\n",
        "Create a new token \n",
        "Provide a new name \n",
        "Select write permissions\n",
        "copy the token and save it in a safe place\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The following Python and it performs two main tasks: importing the `os` module and setting an environment variable.\n",
        "\n",
        "1. Importing the `os` module:\n",
        "   - The `os` module in Python provides a way to interact with the operating system. It allows you to perform various operating system-related tasks, such as accessing environment variables, working with files and directories, etc.\n",
        "   - In this code, the `os` module is imported using the `import` statement. This statement makes the functionality of the `os` module available for use in the current Python script.\n",
        "\n",
        "2. Setting an environment variable:\n",
        "   - An environment variable is a value that can be accessed by programs running on the operating system. It is a way to store configuration settings or other information that can be used by multiple programs.\n",
        "   - In this code, a new environment variable is being set using the `os.environ` dictionary. The key of the dictionary is `'HF_TOKEN'`, and the value is assigned from the `HUGGINGFACE_TOKEN` variable.\n",
        "   - The `HUGGINGFACE_TOKEN` variable is defined just above this code snippet, and it is assigned a string value `\"hf_**************\"` using the `#@param` syntax. This syntax is often used in Jupyter notebooks to allow user input and parameter configuration directly in the notebook interface.\n",
        "   - By setting the `'HF_TOKEN'` environment variable, it can be accessed by other parts of the program or other programs running on the same operating system.\n",
        "\n",
        "Overall, this code imports the `os` module and sets an environment variable named `'HF_TOKEN'` with the value provided in the `HUGGINGFACE_TOKEN` variable."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "N5r2ikbwR68c"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "# set the Hugging Face Token from \n",
        "# add the Hugging Face Token to the environment variables\n",
        "HUGGINGFACE_TOKEN = \"Enter Hugging Face Key\" #@param {type:\"string\"}\n",
        "os.environ['HF_TOKEN']HUGGINGFACE_TOKEN"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "This code snippet defines a function called clear_output that is used to clear the output of the current cell in Jupyter Notebook or IPython. Let's break down the code and understand its functionality:\n",
        "\n",
        "The function clear_output takes one parameter called wait, which is a boolean value. By default, wait is set to False. This parameter determines whether the function should wait until new output is available to replace the existing output before clearing it.\n",
        "\n",
        "The function itself is used to clear the output of the current cell. In Jupyter Notebook or IPython, when a cell produces output, such as printed text or graphical plots, that output is displayed below the cell. The clear_output function allows you to clear that output.\n",
        "\n",
        "The implementation of the function is not provided in the code snippet, as indicated by the ellipsis (...). The ellipsis represents a placeholder for the actual code that performs the clearing of the output. The implementation of the function may involve interacting with the Jupyter Notebook or IPython API to remove the existing output from the cell.\n",
        "\n",
        "Overall, this function provides a convenient way to clear the output of the current cell in Jupyter Notebook or IPython, making it easier to manage and update the displayed output during interactive coding sessions.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "nmXm0dxuRinA"
      },
      "outputs": [],
      "source": [
        "# Download Phi-3-mini-4k-instruct model & Whisper Tiny\n",
        "import torch\n",
        "from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline\n",
        "\n",
        "torch.random.manual_seed(0)\n",
        "\n",
        "model = AutoModelForCausalLM.from_pretrained(\n",
        "    \"microsoft/Phi-3-mini-4k-instruct\",\n",
        "    device_map=\"cuda\",\n",
        "    torch_dtype=\"auto\",\n",
        "    trust_remote_code=True,\n",
        ")\n",
        "tokenizer = AutoTokenizer.from_pretrained(\"microsoft/Phi-3-mini-4k-instruct\")\n",
        "\n",
        "#whisper for speech to text()\n",
        "import whisper\n",
        "select_model =\"tiny\" # ['tiny', 'base']\n",
        "whisper_model = whisper.load_model(select_model)\n",
        "\n",
        "#from IPython.display import clear_output\n",
        "#clear_output()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Perform text-to-speech (TTS) using the Edge TTS service. Let's go through the relevant function implementations one by one:\n",
        "\n",
        "1. `calculate_rate_string(input_value)`: This function takes an input value and calculates the rate string for the TTS voice. The input value represents the desired speed of the speech, where a value of 1 represents the normal speed. The function calculates the rate string by subtracting 1 from the input value, multiplying it by 100, and then determining the sign based on whether the input value is greater than or equal to 1. The function returns the rate string in the format \"{sign}{rate}\".\n",
        "\n",
        "2.`make_chunks(input_text, language)`: This function takes an input text and a language as parameters. It splits the input text into chunks based on the language-specific rules. In this implementation, if the language is \"English\", the function splits the text at each period (\".\") and removes any leading or trailing whitespace. It then appends a period to each chunk and returns the filtered list of chunks.\n",
        "\n",
        "3. `tts_file_name(text)`: This function generates a file name for the TTS audio file based on the input text. It performs several transformations on the text: removing a trailing period (if present), converting the text to lowercase, stripping leading and trailing whitespace, and replacing spaces with underscores. It then truncates the text to a maximum of 25 characters (if longer) or uses the full text if it is empty. Finally, it generates a random string using the [`uuid`] module and combines it with the truncated text to create the file name in the format \"/content/edge_tts_voice/{truncated_text}_{random_string}.mp3\".\n",
        "\n",
        "4. `merge_audio_files(audio_paths, output_path)`: This function merges multiple audio files into a single audio file. It takes a list of audio file paths and an output path as parameters. The function initializes an empty `AudioSegment` object called [`merged_audio`]. It then iterates through each audio file path, loads the audio file using the `AudioSegment.from_file()` method from the `pydub` library, and appends the current audio file to the [`merged_audio`] object. Finally, it exports the merged audio to the specified output path in the MP3 format.\n",
        "\n",
        "5. `edge_free_tts(chunks_list, speed, voice_name, save_path): This function performs the TTS operation using the Edge TTS service. It takes a list of text chunks, the speed of the speech, the voice name, and the save path as parameters. If the number of chunks is greater than 1, the function creates a directory for storing the individual chunk audio files. It then iterates through each chunk, constructs an Edge TTS command using the `calculate_rate_string()' function, the voice name, and the chunk text, and executes the command using the `os.system()` function. If the command execution is successful, it appends the path of the generated audio file to a list. After processing all the chunks, it merges the individual audio files using the `merge_audio_files()` function and saves the merged audio to the specified save path. If there is only one chunk, it directly generates the Edge TTS command and saves the audio to the save path. Finally, it returns the save path of the generated audio file.\n",
        "\n",
        "6. `random_audio_name_generate()`: This function generates a random audio file name using the [`uuid`] module. It generates a random UUID, converts it to a string, takes the first 8 characters, appends the \".mp3\" extension, and returns the random audio file name.\n",
        "\n",
        "7. `talk(input_text)`: This function is the main entry point for performing the TTS operation. It takes an input text as a parameter. It first checks the length of the input text to determine if it is a long sentence (greater than or equal to 600 characters). Based on the length and the value of the `translate_text_flag` variable, it determines the language and generates the list of text chunks using the `make_chunks()` function. It then generates a save path for the audio file using the `random_audio_name_generate()` function. Finally, it calls the `edge_free_tts()` function to perform the TTS operation and returns the save path of the generated audio file.\n",
        "\n",
        "Overall, these functions work together to split the input text into chunks, generate a file name for the audio file, perform the TTS operation using the Edge TTS service, and merge the individual audio files into a single audio file."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 93
        },
        "id": "Mv4WVhNUz4IL",
        "outputId": "7f177f73-3eb1-4d7c-d5e9-1e7cabe32f63"
      },
      "outputs": [],
      "source": [
        "#@title Edge TTS\n",
        "def calculate_rate_string(input_value):\n",
        "    rate = (input_value - 1) * 100\n",
        "    sign = '+' if input_value >= 1 else '-'\n",
        "    return f\"{sign}{abs(int(rate))}\"\n",
        "\n",
        "\n",
        "def make_chunks(input_text, language):\n",
        "    language=\"English\"\n",
        "    if language == \"English\":\n",
        "      temp_list = input_text.strip().split(\".\")\n",
        "      filtered_list = [element.strip() + '.' for element in temp_list[:-1] if element.strip() and element.strip() != \"'\" and element.strip() != '\"']\n",
        "      if temp_list[-1].strip():\n",
        "          filtered_list.append(temp_list[-1].strip())\n",
        "      return filtered_list\n",
        "\n",
        "\n",
        "import re\n",
        "import uuid\n",
        "def tts_file_name(text):\n",
        "    if text.endswith(\".\"):\n",
        "        text = text[:-1]\n",
        "    text = text.lower()\n",
        "    text = text.strip()\n",
        "    text = text.replace(\" \",\"_\")\n",
        "    truncated_text = text[:25] if len(text) > 25 else text if len(text) > 0 else \"empty\"\n",
        "    random_string = uuid.uuid4().hex[:8].upper()\n",
        "    file_name = f\"/content/edge_tts_voice/{truncated_text}_{random_string}.mp3\"\n",
        "    return file_name\n",
        "\n",
        "\n",
        "from pydub import AudioSegment\n",
        "import shutil\n",
        "import os\n",
        "def merge_audio_files(audio_paths, output_path):\n",
        "    # Initialize an empty AudioSegment\n",
        "    merged_audio = AudioSegment.silent(duration=0)\n",
        "\n",
        "    # Iterate through each audio file path\n",
        "    for audio_path in audio_paths:\n",
        "        # Load the audio file using Pydub\n",
        "        audio = AudioSegment.from_file(audio_path)\n",
        "\n",
        "        # Append the current audio file to the merged_audio\n",
        "        merged_audio += audio\n",
        "\n",
        "    # Export the merged audio to the specified output path\n",
        "    merged_audio.export(output_path, format=\"mp3\")\n",
        "\n",
        "def edge_free_tts(chunks_list,speed,voice_name,save_path):\n",
        "  # print(chunks_list)\n",
        "  if len(chunks_list)>1:\n",
        "    chunk_audio_list=[]\n",
        "    if os.path.exists(\"/content/edge_tts_voice\"):\n",
        "      shutil.rmtree(\"/content/edge_tts_voice\")\n",
        "    os.mkdir(\"/content/edge_tts_voice\")\n",
        "    k=1\n",
        "    for i in chunks_list:\n",
        "      print(i)\n",
        "      edge_command=f'edge-tts  --rate={calculate_rate_string(speed)}% --voice {voice_name} --text \"{i}\" --write-media /content/edge_tts_voice/{k}.mp3'\n",
        "      print(edge_command)\n",
        "      var1=os.system(edge_command)\n",
        "      if var1==0:\n",
        "        pass\n",
        "      else:\n",
        "        print(f\"Failed: {i}\")\n",
        "      chunk_audio_list.append(f\"/content/edge_tts_voice/{k}.mp3\")\n",
        "      k+=1\n",
        "    # print(chunk_audio_list)\n",
        "    merge_audio_files(chunk_audio_list, save_path)\n",
        "  else:\n",
        "    edge_command=f'edge-tts  --rate={calculate_rate_string(speed)}% --voice {voice_name} --text \"{chunks_list[0]}\" --write-media {save_path}'\n",
        "    print(edge_command)\n",
        "    var2=os.system(edge_command)\n",
        "    if var2==0:\n",
        "      pass\n",
        "    else:\n",
        "      print(f\"Failed: {chunks_list[0]}\")\n",
        "  return save_path\n",
        "\n",
        "# text = \"This is Microsoft Phi 3 mini 4k instruct Demo\" Simply update the text variable with the text you want to convert to speech\n",
        "text = 'This is Microsoft Phi 3 mini 4k instruct Demo'  # @param {type: \"string\"}\n",
        "Language = \"English\" # @param ['English']\n",
        "# Gender of voice simply change from male to female and choose the voice you want to use\n",
        "Gender = \"Female\"# @param ['Male', 'Female']\n",
        "female_voice=\"en-US-AriaNeural\"# @param[\"en-US-AriaNeural\",'zh-CN-XiaoxiaoNeural','zh-CN-XiaoyiNeural']\n",
        "speed = 1  # @param {type: \"number\"}\n",
        "translate_text_flag  = False\n",
        "if len(text)>=600:\n",
        "  long_sentence = True\n",
        "else:\n",
        "  long_sentence = False\n",
        "\n",
        "# long_sentence = False # @param {type:\"boolean\"}\n",
        "save_path = ''  # @param {type: \"string\"}\n",
        "if len(save_path)==0:\n",
        "  save_path=tts_file_name(text)\n",
        "if Language == \"English\" :\n",
        "  if Gender==\"Male\":\n",
        "    voice_name=\"en-US-ChristopherNeural\"\n",
        "  if Gender==\"Female\":\n",
        "    voice_name=female_voice\n",
        "    # voice_name=\"en-US-AriaNeural\"\n",
        "\n",
        "\n",
        "if translate_text_flag:\n",
        "  input_text=text\n",
        "  # input_text=translate_text(text, Language)\n",
        "  # print(\"Translateting\")\n",
        "else:\n",
        "  input_text=text\n",
        "if long_sentence==True and translate_text_flag==True:\n",
        "  chunks_list=make_chunks(input_text,Language)\n",
        "elif long_sentence==True and translate_text_flag==False:\n",
        "  chunks_list=make_chunks(input_text,\"English\")\n",
        "else:\n",
        "  chunks_list=[input_text]\n",
        "# print(chunks_list)\n",
        "# edge_save_path=edge_free_tts(chunks_list,speed,voice_name,save_path)\n",
        "# from IPython.display import clear_output\n",
        "# clear_output()\n",
        "# from IPython.display import Audio\n",
        "# Audio(edge_save_path, autoplay=True)\n",
        "\n",
        "from IPython.display import clear_output\n",
        "from IPython.display import Audio\n",
        "if not os.path.exists(\"/content/audio\"):\n",
        "    os.mkdir(\"/content/audio\")\n",
        "import uuid\n",
        "def random_audio_name_generate():\n",
        "  random_uuid = uuid.uuid4()\n",
        "  audio_extension = \".mp3\"\n",
        "  random_audio_name = str(random_uuid)[:8] + audio_extension\n",
        "  return random_audio_name\n",
        "def talk(input_text):\n",
        "  global translate_text_flag,Language,speed,voice_name\n",
        "  if len(input_text)>=600:\n",
        "    long_sentence = True\n",
        "  else:\n",
        "    long_sentence = False\n",
        "\n",
        "  if long_sentence==True and translate_text_flag==True:\n",
        "    chunks_list=make_chunks(input_text,Language)\n",
        "  elif long_sentence==True and translate_text_flag==False:\n",
        "    chunks_list=make_chunks(input_text,\"English\")\n",
        "  else:\n",
        "    chunks_list=[input_text]\n",
        "  save_path=\"/content/audio/\"+random_audio_name_generate()\n",
        "  edge_save_path=edge_free_tts(chunks_list,speed,voice_name,save_path)\n",
        "  return edge_save_path\n",
        "\n",
        "\n",
        "edge_save_path=talk(text)\n",
        "Audio(edge_save_path, autoplay=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The implementation of two functions: convert_to_text and run_text_prompt, as well as the declaration of two classes: str and Audio.\n",
        "\n",
        "The convert_to_text function takes an audio_path as input and transcribes the audio to text using a model called whisper_model. The function first checks if the gpu flag is set to True. If it is, the whisper_model is used with certain parameters such as word_timestamps=True, fp16=True, language='English', and task='translate'. If the gpu flag is False, the whisper_model is used with fp16=False. The resulting transcription is then saved to a file named 'scan.txt' and returned as the text.\n",
        "\n",
        "The run_text_prompt function takes a message and a chat_history as input. It uses the phi_demo function to generate a response from a chatbot based on the input message. The generated response is then passed to the talk function, which converts the response into an audio file and returns the file path. The Audio class is used to display and play the audio file. The audio is displayed using the display function from the IPython.display module, and the Audio object is created with the autoplay=True parameter, so the audio starts playing automatically. The chat_history is updated with the input message and the generated response, and an empty string and the updated chat_history are returned.\n",
        "\n",
        "The str class is a built-in class in Python that represents a sequence of characters. It provides various methods for manipulating and working with strings, such as capitalize, casefold, center, count, encode, endswith, expandtabs, find, format, index, isalnum, isalpha, isascii, isdecimal, isdigit, isidentifier, islower, isnumeric, isprintable, isspace, istitle, isupper, join, ljust, lower, lstrip, partition, replace, removeprefix, removesuffix, rfind, rindex, rjust, rpartition, rsplit, rstrip, split, splitlines, startswith, strip, swapcase, title, translate, upper, zfill, and more. These methods allow you to perform operations like searching, replacing, formatting, and manipulating strings.\n",
        "\n",
        "The Audio class is a custom class that represents an audio object. It is used to create an audio player in the Jupyter Notebook environment. The class accepts various parameters such as data, filename, url, embed, rate, autoplay, and normalize. The data parameter can be a numpy array, a list of samples, a string representing a filename or URL, or raw PCM data. The filename parameter is used to specify a local file to load the audio data from, and the url parameter is used to specify a URL to download the audio data from. The embed parameter determines whether the audio data should be embedded using a data URI or referenced from the original source. The rate parameter specifies the sampling rate of the audio data. The autoplay parameter determines whether the audio should start playing automatically. The normalize parameter specifies whether the audio data should be normalized (rescaled) to the maximum possible range. The Audio class also provides methods like reload to reload the audio data from file or URL, and attributes like src_attr, autoplay_attr, and element_id_attr to retrieve the corresponding attributes for the audio element in HTML.\n",
        "\n",
        "Overall, these functions and classes are used to transcribe audio to text, generate audio responses from a chatbot, and display and play audio in the Jupyter Notebook environment."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "0e6aTA6mk7Gi",
        "outputId": "4c4825c9-f1ef-4d9e-d294-83d67248e073"
      },
      "outputs": [],
      "source": [
        "#@title Run gradio app\n",
        "def convert_to_text(audio_path):\n",
        "  gpu=True\n",
        "  if gpu:\n",
        "    result = whisper_model.transcribe(audio_path,word_timestamps=True,fp16=True,language='English',task='translate')\n",
        "  else:\n",
        "    result = whisper_model.transcribe(audio_path,word_timestamps=True,fp16=False,language='English',task='translate')\n",
        "  with open('scan.txt', 'w') as file:\n",
        "    file.write(str(result))\n",
        "  return result[\"text\"]\n",
        "\n",
        "\n",
        "import gradio as gr\n",
        "from IPython.display import Audio, display\n",
        "def run_text_prompt(message, chat_history):\n",
        "    bot_message = phi_demo(message)\n",
        "    edge_save_path=talk(bot_message)\n",
        "    # print(edge_save_path)\n",
        "    display(Audio(edge_save_path, autoplay=True))\n",
        "\n",
        "    chat_history.append((message, bot_message))\n",
        "    return \"\", chat_history\n",
        "\n",
        "\n",
        "def run_audio_prompt(audio, chat_history):\n",
        "    if audio is None:\n",
        "        return None, chat_history\n",
        "    print(audio)\n",
        "    message_transcription = convert_to_text(audio)\n",
        "    _, chat_history = run_text_prompt(message_transcription, chat_history)\n",
        "    return None, chat_history\n",
        "\n",
        "\n",
        "with gr.Blocks() as demo:\n",
        "    chatbot = gr.Chatbot(label=\"Chat with Phi 3 mini 4k instruct\")\n",
        "\n",
        "    msg = gr.Textbox(label=\"Ask anything\")\n",
        "    msg.submit(run_text_prompt, [msg, chatbot], [msg, chatbot])\n",
        "\n",
        "    with gr.Row():\n",
        "        audio = gr.Audio(sources=\"microphone\", type=\"filepath\")\n",
        "\n",
        "        send_audio_button = gr.Button(\"Send Audio\", interactive=True)\n",
        "        send_audio_button.click(run_audio_prompt, [audio, chatbot], [audio, chatbot])\n",
        "\n",
        "demo.launch(share=True,debug=True)"
      ]
    }
  ],
  "metadata": {
    "accelerator": "GPU",
    "colab": {
      "gpuType": "T4",
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.11.9"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
